ml pipeline automation

安装量: 117
排名: #7330

安装

npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill 'ML Pipeline Automation'
ML Pipeline Automation
ML pipeline automation orchestrates the entire machine learning workflow from data ingestion through model deployment, ensuring reproducibility, scalability, and reliability.
Pipeline Components
Data Ingestion
Collecting data from multiple sources
Data Processing
Cleaning, transformation, feature engineering
Model Training
Training and hyperparameter tuning
Validation
Cross-validation and testing
Deployment
Moving models to production
Monitoring
Tracking performance metrics
Orchestration Platforms
Apache Airflow
Workflow scheduling with DAGs
Kubeflow
Kubernetes-native ML workflows
Jenkins
CI/CD for ML pipelines
Prefect
Modern data flow orchestration
Dagster
Asset-driven orchestration Python Implementation import pandas as pd import numpy as np from sklearn . datasets import make_classification from sklearn . model_selection import train_test_split from sklearn . preprocessing import StandardScaler from sklearn . ensemble import RandomForestClassifier from sklearn . metrics import accuracy_score , f1_score import joblib import logging from datetime import datetime import json import os

Airflow imports

from airflow import DAG from airflow . operators . python import PythonOperator from airflow . operators . bash import BashOperator from airflow . utils . dates import days_ago

MLflow for tracking

import mlflow import mlflow . sklearn

Logging setup

logging . basicConfig ( level = logging . INFO ) logger = logging . getLogger ( name ) print ( "=== 1. Modular Pipeline Functions ===" )

Data ingestion

def ingest_data ( ** context ) : """Ingest and load data""" logger . info ( "Starting data ingestion..." ) X , y = make_classification ( n_samples = 2000 , n_features = 30 , n_informative = 20 , random_state = 42 ) data = pd . DataFrame ( X , columns = [ f'feature_ { i } ' for i in range ( X . shape [ 1 ] ) ] ) data [ 'target' ] = y

Save to disk

data_path

'/tmp/raw_data.csv' data . to_csv ( data_path , index = False ) context [ 'task_instance' ] . xcom_push ( key = 'data_path' , value = data_path ) logger . info ( f"Data ingested: { len ( data ) } rows" ) return { 'status' : 'success' , 'samples' : len ( data ) }

Data processing

def process_data ( ** context ) : """Clean and preprocess data""" logger . info ( "Starting data processing..." )

Get data path from previous task

task_instance

context [ 'task_instance' ] data_path = task_instance . xcom_pull ( key = 'data_path' , task_ids = 'ingest_data' ) data = pd . read_csv ( data_path )

Handle missing values

data

data . fillna ( data . mean ( ) )

Remove duplicates

data

data . drop_duplicates ( )

Remove outliers (simple approach)

numeric_cols

data . select_dtypes ( include = [ np . number ] ) . columns for col in numeric_cols : Q1 = data [ col ] . quantile ( 0.25 ) Q3 = data [ col ] . quantile ( 0.75 ) IQR = Q3 - Q1 data = data [ ( data [ col ]

= Q1 - 1.5 * IQR ) & ( data [ col ] <= Q3 + 1.5 * IQR ) ] processed_path = '/tmp/processed_data.csv' data . to_csv ( processed_path , index = False ) task_instance . xcom_push ( key = 'processed_path' , value = processed_path ) logger . info ( f"Data processed: { len ( data ) } rows after cleaning" ) return { 'status' : 'success' , 'rows_remaining' : len ( data ) }

Feature engineering

def engineer_features ( ** context ) : """Create new features""" logger . info ( "Starting feature engineering..." ) task_instance = context [ 'task_instance' ] processed_path = task_instance . xcom_pull ( key = 'processed_path' , task_ids = 'process_data' ) data = pd . read_csv ( processed_path )

Create interaction features

feature_cols

[ col for col in data . columns if col . startswith ( 'feature_' ) ] for i in range ( min ( 5 , len ( feature_cols ) ) ) : for j in range ( i + 1 , min ( 6 , len ( feature_cols ) ) ) : data [ f'interaction_ { i } _ { j } ' ] = data [ feature_cols [ i ] ] * data [ feature_cols [ j ] ]

Create polynomial features

for col in feature_cols [ : 5 ] : data [ f' { col } _squared' ] = data [ col ] ** 2 engineered_path = '/tmp/engineered_data.csv' data . to_csv ( engineered_path , index = False ) task_instance . xcom_push ( key = 'engineered_path' , value = engineered_path ) logger . info ( f"Features engineered: { len ( data . columns ) } total features" ) return { 'status' : 'success' , 'features' : len ( data . columns ) }

Train model

def train_model ( ** context ) : """Train ML model""" logger . info ( "Starting model training..." ) task_instance = context [ 'task_instance' ] engineered_path = task_instance . xcom_pull ( key = 'engineered_path' , task_ids = 'engineer_features' ) data = pd . read_csv ( engineered_path ) X = data . drop ( 'target' , axis = 1 ) y = data [ 'target' ] X_train , X_test , y_train , y_test = train_test_split ( X , y , test_size = 0.2 , random_state = 42 )

Scale features

scaler

StandardScaler ( ) X_train_scaled = scaler . fit_transform ( X_train ) X_test_scaled = scaler . transform ( X_test )

Train model

model

RandomForestClassifier ( n_estimators = 100 , max_depth = 15 , random_state = 42 ) model . fit ( X_train_scaled , y_train )

Evaluate

y_pred

model . predict ( X_test_scaled ) accuracy = accuracy_score ( y_test , y_pred ) f1 = f1_score ( y_test , y_pred )

Save model

model_path

'/tmp/model.pkl' scaler_path = '/tmp/scaler.pkl' joblib . dump ( model , model_path ) joblib . dump ( scaler , scaler_path ) task_instance . xcom_push ( key = 'model_path' , value = model_path ) task_instance . xcom_push ( key = 'scaler_path' , value = scaler_path )

Log to MLflow

with mlflow . start_run ( ) : mlflow . log_param ( 'n_estimators' , 100 ) mlflow . log_param ( 'max_depth' , 15 ) mlflow . log_metric ( 'accuracy' , accuracy ) mlflow . log_metric ( 'f1_score' , f1 ) mlflow . sklearn . log_model ( model , 'model' ) logger . info ( f"Model trained: Accuracy= { accuracy : .4f } , F1= { f1 : .4f } " ) return { 'status' : 'success' , 'accuracy' : accuracy , 'f1_score' : f1 }

Validate model

def validate_model ( ** context ) : """Validate model performance""" logger . info ( "Starting model validation..." ) task_instance = context [ 'task_instance' ] model_path = task_instance . xcom_pull ( key = 'model_path' , task_ids = 'train_model' ) engineered_path = task_instance . xcom_pull ( key = 'engineered_path' , task_ids = 'engineer_features' ) model = joblib . load ( model_path ) data = pd . read_csv ( engineered_path ) X = data . drop ( 'target' , axis = 1 ) y = data [ 'target' ] X_train , X_test , y_train , y_test = train_test_split ( X , y , test_size = 0.2 , random_state = 42 ) scaler_path = task_instance . xcom_pull ( key = 'scaler_path' , task_ids = 'train_model' ) scaler = joblib . load ( scaler_path ) X_test_scaled = scaler . transform ( X_test )

Validate

y_pred

model . predict ( X_test_scaled ) accuracy = accuracy_score ( y_test , y_pred ) validation_result = { 'status' : 'success' if accuracy

0.85 else 'failed' , 'accuracy' : accuracy , 'threshold' : 0.85 , 'timestamp' : datetime . now ( ) . isoformat ( ) } task_instance . xcom_push ( key = 'validation_result' , value = json . dumps ( validation_result ) ) logger . info ( f"Validation result: { validation_result } " ) return validation_result

Deploy model

def deploy_model ( ** context ) : """Deploy validated model""" logger . info ( "Starting model deployment..." ) task_instance = context [ 'task_instance' ] validation_result = json . loads ( task_instance . xcom_pull ( key = 'validation_result' , task_ids = 'validate_model' ) ) if validation_result [ 'status' ] != 'success' : logger . warning ( "Validation failed, deployment skipped" ) return { 'status' : 'skipped' , 'reason' : 'validation_failed' } model_path = task_instance . xcom_pull ( key = 'model_path' , task_ids = 'train_model' ) scaler_path = task_instance . xcom_pull ( key = 'scaler_path' , task_ids = 'train_model' )

Simulate deployment

deploy_path

'/tmp/deployed_model/' os . makedirs ( deploy_path , exist_ok = True ) import shutil shutil . copy ( model_path , os . path . join ( deploy_path , 'model.pkl' ) ) shutil . copy ( scaler_path , os . path . join ( deploy_path , 'scaler.pkl' ) ) logger . info ( f"Model deployed to { deploy_path } " ) return { 'status' : 'success' , 'deploy_path' : deploy_path }

2. Airflow DAG Definition

print ( "\n=== 2. Airflow DAG ===" ) dag_definition = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'ml-team', 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG( 'ml_pipeline_dag', default_args=default_args, description='End-to-end ML pipeline', schedule_interval='0 2 * * *', # Daily at 2 AM start_date=datetime(2024, 1, 1), catchup=False, ) as dag:

Task 1: Ingest Data

ingest = PythonOperator( task_id='ingest_data', python_callable=ingest_data, )

Task 2: Process Data

process = PythonOperator( task_id='process_data', python_callable=process_data, )

Task 3: Engineer Features

engineer = PythonOperator( task_id='engineer_features', python_callable=engineer_features, )

Task 4: Train Model

train = PythonOperator( task_id='train_model', python_callable=train_model, )

Task 5: Validate Model

validate = PythonOperator( task_id='validate_model', python_callable=validate_model, )

Task 6: Deploy Model

deploy = PythonOperator( task_id='deploy_model', python_callable=deploy_model, )

Define dependencies

ingest >> process >> engineer >> train >> validate >> deploy ''' print ( "Airflow DAG defined with 6 tasks" )

3. Pipeline execution summary

print ( "\n=== 3. Pipeline Execution ===" ) class PipelineOrchestrator : def init ( self ) : self . execution_log = [ ] self . start_time = None self . end_time = None def run_pipeline ( self ) : self . start_time = datetime . now ( ) logger . info ( "Starting ML pipeline execution" ) try :

Execute pipeline tasks

result1

ingest_data ( task_instance = self ) self . execution_log . append ( ( 'ingest_data' , result1 ) ) result2 = process_data ( task_instance = self ) self . execution_log . append ( ( 'process_data' , result2 ) ) result3 = engineer_features ( task_instance = self ) self . execution_log . append ( ( 'engineer_features' , result3 ) ) result4 = train_model ( task_instance = self ) self . execution_log . append ( ( 'train_model' , result4 ) ) result5 = validate_model ( task_instance = self ) self . execution_log . append ( ( 'validate_model' , result5 ) ) result6 = deploy_model ( task_instance = self ) self . execution_log . append ( ( 'deploy_model' , result6 ) ) self . end_time = datetime . now ( ) logger . info ( "Pipeline execution completed successfully" ) except Exception as e : logger . error ( f"Pipeline execution failed: { str ( e ) } " ) def xcom_push ( self , key , value ) : if not hasattr ( self , 'xcom_storage' ) : self . xcom_storage = { } self . xcom_storage [ key ] = value def xcom_pull ( self , key , task_ids ) : if hasattr ( self , 'xcom_storage' ) and key in self . xcom_storage : return self . xcom_storage [ key ] return None def get_summary ( self ) : duration = ( self . end_time - self . start_time ) . total_seconds ( ) if self . end_time else 0 return { 'start_time' : self . start_time . isoformat ( ) if self . start_time else None , 'end_time' : self . end_time . isoformat ( ) if self . end_time else None , 'duration_seconds' : duration , 'tasks_executed' : len ( self . execution_log ) , 'execution_log' : self . execution_log }

Execute pipeline

orchestrator

PipelineOrchestrator
(
)
orchestrator
.
run_pipeline
(
)
summary
=
orchestrator
.
get_summary
(
)
print
(
"\n=== Pipeline Summary ==="
)
for
key
,
value
in
summary
.
items
(
)
:
if
key
!=
'execution_log'
:
print
(
f"
{
key
}
:
{
value
}
"
)
print
(
"\nTask Execution Log:"
)
for
task_name
,
result
in
summary
[
'execution_log'
]
:
print
(
f"
{
task_name
}
:
{
result
}
"
)
print
(
"\nML pipeline automation setup completed!"
)
Pipeline Best Practices
Modularity
Each step should be independent
Idempotency
Tasks should be safely repeatable
Error Handling
Graceful degradation and alerting
Versioning
Track data, code, and model versions
Monitoring
Track execution metrics and logs
Scheduling Strategies
Daily
Standard for daily retraining
Weekly
For larger feature engineering
On-demand
Triggered by data updates
Real-time
For streaming applications Deliverables Automated pipeline DAG Task dependency graph Execution logs and monitoring Performance metrics Rollback procedures Documentation
返回排行榜